summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2017-01-28 12:41:20 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2017-01-28 09:44:06 -0800
commit69abcd24d9ddc3490b5adeac213089318d05e4b8 (patch)
tree87a36fc091dc8a9367af6c03f5acd93a94d32441
parent00c87dd3a6804666a45cefd9ba9cb3dbcd8c68e4 (diff)
downloadRxCpp-69abcd24d9ddc3490b5adeac213089318d05e4b8.tar.gz
decouple ref_count from connectable_observable
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-ref_count.hpp70
-rw-r--r--Rx/v2/src/rxcpp/rx-connectable_observable.hpp19
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp9
-rw-r--r--Rx/v2/test/operators/publish.cpp2
-rw-r--r--Rx/v2/test/subscriptions/subscription.cpp1
5 files changed, 68 insertions, 33 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp b/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp
index 886c605..55dde05 100644
--- a/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp
@@ -2,6 +2,14 @@
#pragma once
+/*! \file rx-ref_count.hpp
+
+ \brief takes a connectable_observable source and uses a ref_count of the subscribers to control the connection to the published source.
+ The first subscription will cause a call to connect() and the last unsubscribe will unsubscribe the connection.
+
+ \return An observable that emitting the items from its source.
+ */
+
#if !defined(RXCPP_OPERATORS_RX_REF_COUNT_HPP)
#define RXCPP_OPERATORS_RX_REF_COUNT_HPP
@@ -13,6 +21,16 @@ namespace operators {
namespace detail {
+template<class... AN>
+struct ref_count_invalid_arguments {};
+
+template<class... AN>
+struct ref_count_invalid : public rxo::operator_base<ref_count_invalid_arguments<AN...>> {
+ using type = observable<ref_count_invalid_arguments<AN...>, ref_count_invalid<AN...>>;
+};
+template<class... AN>
+using ref_count_invalid_t = typename ref_count_invalid<AN...>::type;
+
template<class T, class ConnectableObservable>
struct ref_count : public operator_base<T>
{
@@ -59,33 +77,41 @@ struct ref_count : public operator_base<T>
}
};
-class ref_count_factory
-{
-public:
- ref_count_factory() {}
- template<class... TN>
- auto operator()(connectable_observable<TN...>&& source)
- -> observable<rxu::value_type_t<connectable_observable<TN...>>, ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>> {
- return observable<rxu::value_type_t<connectable_observable<TN...>>, ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>>(
- ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>(std::move(source)));
- }
- template<class... TN>
- auto operator()(const connectable_observable<TN...>& source)
- -> observable<rxu::value_type_t<connectable_observable<TN...>>, ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>> {
- return observable<rxu::value_type_t<connectable_observable<TN...>>, ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>>(
- ref_count<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>(source));
- }
-};
-
}
-inline auto ref_count()
- -> detail::ref_count_factory {
- return detail::ref_count_factory();
+/*! @copydoc rx-ref_count.hpp
+*/
+template<class... AN>
+auto ref_count(AN&&... an)
+ -> operator_factory<ref_count_tag, AN...> {
+ return operator_factory<ref_count_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
+}
+
}
-}
+template<>
+struct member_overload<ref_count_tag>
+{
+ template<class ConnectableObservable,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_connectable_observable<ConnectableObservable>>,
+ class SourceValue = rxu::value_type_t<ConnectableObservable>,
+ class RefCount = rxo::detail::ref_count<SourceValue, rxu::decay_t<ConnectableObservable>>,
+ class Value = rxu::value_type_t<RefCount>,
+ class Result = observable<Value, RefCount>
+ >
+ static Result member(ConnectableObservable&& o) {
+ return Result(RefCount(std::forward<ConnectableObservable>(o)));
+ }
+ template<class... AN>
+ static operators::detail::ref_count_invalid_t<AN...> member(AN...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "ref_count takes no arguments");
+ }
+};
+
}
#endif
diff --git a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp
index 4c86994..7038e24 100644
--- a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp
@@ -162,16 +162,15 @@ public:
return cs;
}
- /// ref_count ->
- /// takes a connectable_observable source and uses a ref_count of the subscribers
- /// to control the connection to the published source. The first subscription
- /// will cause a call to connect() and the last unsubscribe will unsubscribe the
- /// connection.
- ///
- auto ref_count() const
- -> observable<T, rxo::detail::ref_count<T, this_type>> {
- return observable<T, rxo::detail::ref_count<T, this_type>>(
- rxo::detail::ref_count<T, this_type>(*this));
+ /*! @copydoc rx-ref_count.hpp
+ */
+ template<class... AN>
+ auto ref_count(AN... an) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(observable_member(ref_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
+ /// \endcond
+ {
+ return observable_member(ref_count_tag{}, *this, std::forward<AN>(an)...);
}
/*! @copydoc rx-connect_forever.hpp
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 4d05dc2..0fa5d58 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -96,7 +96,6 @@ public:
}
#include "operators/rx-lift.hpp"
-#include "operators/rx-ref_count.hpp"
#include "operators/rx-subscribe.hpp"
namespace rxcpp {
@@ -301,6 +300,13 @@ struct average_tag : reduce_tag {};
struct min_tag : reduce_tag {};
struct max_tag : reduce_tag {};
+struct ref_count_tag {
+ template<class Included>
+ struct include_header{
+ static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-ref_count.hpp>");
+ };
+};
+
struct pairwise_tag {
template<class Included>
struct include_header{
@@ -510,5 +516,6 @@ struct zip_tag {
#include "operators/rx-multicast.hpp"
#include "operators/rx-publish.hpp"
+#include "operators/rx-ref_count.hpp"
#endif
diff --git a/Rx/v2/test/operators/publish.cpp b/Rx/v2/test/operators/publish.cpp
index e86374b..87c8fff 100644
--- a/Rx/v2/test/operators/publish.cpp
+++ b/Rx/v2/test/operators/publish.cpp
@@ -1,6 +1,8 @@
#include "../test.h"
#include <rxcpp/operators/rx-publish.hpp>
#include <rxcpp/operators/rx-connect_forever.hpp>
+#include <rxcpp/operators/rx-ref_count.hpp>
+
SCENARIO("publish range", "[hide][range][subject][publish][subject][operators]"){
GIVEN("a range"){
diff --git a/Rx/v2/test/subscriptions/subscription.cpp b/Rx/v2/test/subscriptions/subscription.cpp
index fb351bd..2a9f15d 100644
--- a/Rx/v2/test/subscriptions/subscription.cpp
+++ b/Rx/v2/test/subscriptions/subscription.cpp
@@ -4,6 +4,7 @@
#include "rxcpp/operators/rx-take.hpp"
#include "rxcpp/operators/rx-observe_on.hpp"
#include "rxcpp/operators/rx-publish.hpp"
+#include "rxcpp/operators/rx-ref_count.hpp"
SCENARIO("observe subscription", "[hide]"){
GIVEN("observable of ints"){