summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2017-01-27 22:56:57 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2017-01-27 14:07:26 -0800
commit00c87dd3a6804666a45cefd9ba9cb3dbcd8c68e4 (patch)
treeee7a7189cbf32225fd2cba4d228725f70446b4ee
parent4b6a2f541dece7256271374ca6555ce12a76a99b (diff)
downloadRxCpp-00c87dd3a6804666a45cefd9ba9cb3dbcd8c68e4.tar.gz
decouple connect_forever from connectable_observable
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-connect_forever.hpp68
-rw-r--r--Rx/v2/src/rxcpp/rx-connectable_observable.hpp19
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp3
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp8
-rw-r--r--Rx/v2/test/operators/publish.cpp2
5 files changed, 65 insertions, 35 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-connect_forever.hpp b/Rx/v2/src/rxcpp/operators/rx-connect_forever.hpp
index 1117a48..bcbe830 100644
--- a/Rx/v2/src/rxcpp/operators/rx-connect_forever.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-connect_forever.hpp
@@ -2,6 +2,14 @@
#pragma once
+/*! \file rx-connect_forever.hpp
+
+ \brief takes a connectable_observable source and calls connect during the construction of the expression.
+ This means that the source starts running without any subscribers and continues running after all subscriptions have been unsubscribed.
+
+ \return An observable that emitting the items from its source.
+ */
+
#if !defined(RXCPP_OPERATORS_RX_CONNECT_FOREVER_HPP)
#define RXCPP_OPERATORS_RX_CONNECT_FOREVER_HPP
@@ -13,6 +21,16 @@ namespace operators {
namespace detail {
+template<class... AN>
+struct connect_forever_invalid_arguments {};
+
+template<class... AN>
+struct connect_forever_invalid : public rxo::operator_base<connect_forever_invalid_arguments<AN...>> {
+ using type = observable<connect_forever_invalid_arguments<AN...>, connect_forever_invalid<AN...>>;
+};
+template<class... AN>
+using connect_forever_invalid_t = typename connect_forever_invalid<AN...>::type;
+
template<class T, class ConnectableObservable>
struct connect_forever : public operator_base<T>
{
@@ -32,33 +50,41 @@ struct connect_forever : public operator_base<T>
}
};
-class connect_forever_factory
-{
-public:
- connect_forever_factory() {}
- template<class... TN>
- auto operator()(connectable_observable<TN...>&& source)
- -> observable<rxu::value_type_t<connectable_observable<TN...>>, connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>> {
- return observable<rxu::value_type_t<connectable_observable<TN...>>, connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>>(
- connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>(std::move(source)));
- }
- template<class... TN>
- auto operator()(const connectable_observable<TN...>& source)
- -> observable<rxu::value_type_t<connectable_observable<TN...>>, connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>> {
- return observable<rxu::value_type_t<connectable_observable<TN...>>, connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>>(
- connect_forever<rxu::value_type_t<connectable_observable<TN...>>, connectable_observable<TN...>>(source));
- }
-};
-
}
-inline auto connect_forever()
- -> detail::connect_forever_factory {
- return detail::connect_forever_factory();
+/*! @copydoc rx-connect_forever.hpp
+*/
+template<class... AN>
+auto connect_forever(AN&&... an)
+-> operator_factory<connect_forever_tag, AN...> {
+ return operator_factory<connect_forever_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}
}
+template<>
+struct member_overload<connect_forever_tag>
+{
+ template<class ConnectableObservable,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_connectable_observable<ConnectableObservable>>,
+ class SourceValue = rxu::value_type_t<ConnectableObservable>,
+ class ConnectForever = rxo::detail::connect_forever<SourceValue, rxu::decay_t<ConnectableObservable>>,
+ class Value = rxu::value_type_t<ConnectForever>,
+ class Result = observable<Value, ConnectForever>
+ >
+ static Result member(ConnectableObservable&& o) {
+ return Result(ConnectForever(std::forward<ConnectableObservable>(o)));
+ }
+
+ template<class... AN>
+ static operators::detail::connect_forever_invalid_t<AN...> member(AN...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "connect_forever takes no arguments");
+ }
+};
+
}
#endif
diff --git a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp
index 373c16c..4c86994 100644
--- a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp
@@ -174,16 +174,15 @@ public:
rxo::detail::ref_count<T, this_type>(*this));
}
- /// connect_forever ->
- /// takes a connectable_observable source and calls connect during
- /// the construction of the expression. This means that the source
- /// starts running without any subscribers and continues running
- /// after all subscriptions have been unsubscribed.
- ///
- auto connect_forever() const
- -> observable<T, rxo::detail::connect_forever<T, this_type>> {
- return observable<T, rxo::detail::connect_forever<T, this_type>>(
- rxo::detail::connect_forever<T, this_type>(*this));
+ /*! @copydoc rx-connect_forever.hpp
+ */
+ template<class... AN>
+ auto connect_forever(AN... an) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(observable_member(connect_forever_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
+ /// \endcond
+ {
+ return observable_member(connect_forever_tag{}, *this, std::forward<AN>(an)...);
}
};
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index e8c9b8c..66f5537 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -191,6 +191,7 @@
#include "operators/rx-combine_latest.hpp"
#include "operators/rx-concat.hpp"
#include "operators/rx-concat_map.hpp"
+#include "operators/rx-connect_forever.hpp"
#include "operators/rx-debounce.hpp"
#include "operators/rx-delay.hpp"
#include "operators/rx-distinct.hpp"
@@ -203,11 +204,9 @@
#include "operators/rx-ignore_elements.hpp"
#include "operators/rx-map.hpp"
#include "operators/rx-merge.hpp"
-#include "operators/rx-multicast.hpp"
#include "operators/rx-observe_on.hpp"
#include "operators/rx-on_error_resume_next.hpp"
#include "operators/rx-pairwise.hpp"
-#include "operators/rx-publish.hpp"
#include "operators/rx-reduce.hpp"
#include "operators/rx-repeat.hpp"
#include "operators/rx-replay.hpp"
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index dda27f7..4d05dc2 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -95,7 +95,6 @@ public:
}
-#include "operators/rx-connect_forever.hpp"
#include "operators/rx-lift.hpp"
#include "operators/rx-ref_count.hpp"
#include "operators/rx-subscribe.hpp"
@@ -170,6 +169,13 @@ struct concat_map_tag {
};
};
+struct connect_forever_tag {
+ template<class Included>
+ struct include_header{
+ static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-connect_forever.hpp>");
+ };
+};
+
struct debounce_tag {
template<class Included>
struct include_header{
diff --git a/Rx/v2/test/operators/publish.cpp b/Rx/v2/test/operators/publish.cpp
index 316f275..e86374b 100644
--- a/Rx/v2/test/operators/publish.cpp
+++ b/Rx/v2/test/operators/publish.cpp
@@ -1,6 +1,6 @@
#include "../test.h"
#include <rxcpp/operators/rx-publish.hpp>
-
+#include <rxcpp/operators/rx-connect_forever.hpp>
SCENARIO("publish range", "[hide][range][subject][publish][subject][operators]"){
GIVEN("a range"){