summaryrefslogtreecommitdiff
path: root/Rx
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2017-01-27 03:01:57 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2017-01-26 16:01:57 -0800
commit4b6a2f541dece7256271374ca6555ce12a76a99b (patch)
tree3e5ea5ddc20ee999bd5d32e146be96b43d694ff0 /Rx
parentad101cf093028f874a5d882967bdb4ccb8032eb8 (diff)
downloadRxCpp-4b6a2f541dece7256271374ca6555ce12a76a99b.tar.gz
decouple publish, publish_synchronized, multicast from observable (#338)
* decouple publish, publish_synchronized, multicast from observable * decouple publish, publish_synchronized, multicast from observable - restore include structure
Diffstat (limited to 'Rx')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-multicast.hpp68
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-publish.hpp141
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp2
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp116
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp20
-rw-r--r--Rx/v2/test/operators/publish.cpp2
-rw-r--r--Rx/v2/test/subscriptions/subscription.cpp1
7 files changed, 211 insertions, 139 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-multicast.hpp b/Rx/v2/src/rxcpp/operators/rx-multicast.hpp
index e970bfe..193a11a 100644
--- a/Rx/v2/src/rxcpp/operators/rx-multicast.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-multicast.hpp
@@ -2,6 +2,15 @@
#pragma once
+/*! \file rx-multicast.hpp
+
+ \brief allows connections to the source to be independent of subscriptions.
+
+ \tparam Subject the subject to multicast the source Observable.
+
+ \param sub the subject.
+*/
+
#if !defined(RXCPP_OPERATORS_RX_MULTICAST_HPP)
#define RXCPP_OPERATORS_RX_MULTICAST_HPP
@@ -13,6 +22,16 @@ namespace operators {
namespace detail {
+template<class... AN>
+struct multicast_invalid_arguments {};
+
+template<class... AN>
+struct multicast_invalid : public rxo::operator_base<multicast_invalid_arguments<AN...>> {
+ using type = observable<multicast_invalid_arguments<AN...>, multicast_invalid<AN...>>;
+};
+template<class... AN>
+using multicast_invalid_t = typename multicast_invalid<AN...>::type;
+
template<class T, class Observable, class Subject>
struct multicast : public operator_base<T>
{
@@ -65,33 +84,40 @@ struct multicast : public operator_base<T>
}
};
-template<class Subject>
-class multicast_factory
-{
- Subject caster;
-public:
- multicast_factory(Subject sub)
- : caster(std::move(sub))
- {
- }
- template<class Observable>
- auto operator()(Observable&& source)
- -> connectable_observable<rxu::value_type_t<rxu::decay_t<Observable>>, multicast<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Subject>> {
- return connectable_observable<rxu::value_type_t<rxu::decay_t<Observable>>, multicast<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Subject>>(
- multicast<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Subject>(std::forward<Observable>(source), caster));
- }
-};
-
}
-template<class Subject>
-inline auto multicast(Subject sub)
- -> detail::multicast_factory<Subject> {
- return detail::multicast_factory<Subject>(std::move(sub));
+/*! @copydoc rx-multicast.hpp
+*/
+template<class... AN>
+auto multicast(AN&&... an)
+ -> operator_factory<multicast_tag, AN...> {
+ return operator_factory<multicast_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}
}
+template<>
+struct member_overload<multicast_tag>
+{
+ template<class Observable, class Subject,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Subject>>,
+ class Value = rxu::value_type_t<Multicast>,
+ class Result = connectable_observable<Value, Multicast>>
+ static Result member(Observable&& o, Subject&& sub) {
+ return Result(Multicast(std::forward<Observable>(o), std::forward<Subject>(sub)));
+ }
+
+ template<class... AN>
+ static operators::detail::multicast_invalid_t<AN...> member(AN...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "multicast takes (Subject)");
+ }
+};
+
}
#endif
diff --git a/Rx/v2/src/rxcpp/operators/rx-publish.hpp b/Rx/v2/src/rxcpp/operators/rx-publish.hpp
index 3589cc9..dc38191 100644
--- a/Rx/v2/src/rxcpp/operators/rx-publish.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-publish.hpp
@@ -2,10 +2,32 @@
#pragma once
+/*! \file rx-publish.hpp
+
+ \brief Turn a cold observable hot and allow connections to the source to be independent of subscriptions.
+ Turn a cold observable hot, send the most recent value to any new subscriber, and allow connections to the source to be independent of subscriptions.
+
+ \tparam T the type of the emitted item (optional).
+
+ \param first an initial item to be emitted by the resulting observable at connection time before emitting the items from the source observable; not emitted to observers that subscribe after the time of connection (optional).
+ \param cs the subscription to control lifetime (optional).
+
+ \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers.
+
+ \sample
+ \snippet publish.cpp publish subject sample
+ \snippet output.txt publish subject sample
+
+ \sample
+ \snippet publish.cpp publish behavior sample
+ \snippet output.txt publish behavior sample
+*/
+
#if !defined(RXCPP_OPERATORS_RX_PUBLISH_HPP)
#define RXCPP_OPERATORS_RX_PUBLISH_HPP
#include "../rx-includes.hpp"
+#include "./rx-multicast.hpp"
namespace rxcpp {
@@ -13,29 +35,118 @@ namespace operators {
namespace detail {
-template<template<class T> class Subject>
-class publish_factory
-{
-public:
- publish_factory() {}
- template<class Observable>
- auto operator()(Observable&& source)
- -> connectable_observable<rxu::value_type_t<rxu::decay_t<Observable>>, multicast<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Subject<rxu::value_type_t<rxu::decay_t<Observable>>>>> {
- return connectable_observable<rxu::value_type_t<rxu::decay_t<Observable>>, multicast<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Subject<rxu::value_type_t<rxu::decay_t<Observable>>>>>(
- multicast<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Subject<rxu::value_type_t<rxu::decay_t<Observable>>>>(
- std::forward<Observable>(source), Subject<rxu::value_type_t<rxu::decay_t<Observable>>>()));
- }
+template<class... AN>
+struct publish_invalid_arguments {};
+
+template<class... AN>
+struct publish_invalid : public rxo::operator_base<publish_invalid_arguments<AN...>> {
+ using type = observable<publish_invalid_arguments<AN...>, publish_invalid<AN...>>;
};
+template<class... AN>
+using publish_invalid_t = typename publish_invalid<AN...>::type;
+
+}
+/*! @copydoc rx-publish.hpp
+*/
+template<class... AN>
+auto publish(AN&&... an)
+ -> operator_factory<publish_tag, AN...> {
+ return operator_factory<publish_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}
-inline auto publish()
- -> detail::publish_factory<rxsub::subject> {
- return detail::publish_factory<rxsub::subject>();
+/*! \brief Turn a cold observable hot and allow connections to the source to be independent of subscriptions.
+
+ \tparam Coordination the type of the scheduler.
+
+ \param cn a scheduler all values are queued and delivered on.
+ \param cs the subscription to control lifetime (optional).
+
+ \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers, on the specified scheduler.
+
+ \sample
+ \snippet publish.cpp publish_synchronized sample
+ \snippet output.txt publish_synchronized sample
+*/
+template<class... AN>
+auto publish_synchronized(AN&&... an)
+ -> operator_factory<publish_synchronized_tag, AN...> {
+ return operator_factory<publish_synchronized_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}
}
+template<>
+struct member_overload<publish_tag>
+{
+ template<class Observable,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Subject = rxsub::subject<SourceValue>,
+ class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
+ class Result = connectable_observable<SourceValue, Multicast>
+ >
+ static Result member(Observable&& o) {
+ return Result(Multicast(std::forward<Observable>(o), Subject(composite_subscription())));
+ }
+
+ template<class Observable,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Subject = rxsub::subject<SourceValue>,
+ class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
+ class Result = connectable_observable<SourceValue, Multicast>
+ >
+ static Result member(Observable&& o, composite_subscription cs) {
+ return Result(Multicast(std::forward<Observable>(o), Subject(cs)));
+ }
+
+ template<class Observable, class T,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Subject = rxsub::behavior<SourceValue>,
+ class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
+ class Result = connectable_observable<SourceValue, Multicast>
+ >
+ static Result member(Observable&& o, T first, composite_subscription cs = composite_subscription()) {
+ return Result(Multicast(std::forward<Observable>(o), Subject(first, cs)));
+ }
+
+ template<class... AN>
+ static operators::detail::publish_invalid_t<AN...> member(AN...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "publish takes (optional CompositeSubscription) or (T, optional CompositeSubscription)");
+ }
+};
+
+template<>
+struct member_overload<publish_synchronized_tag>
+{
+ template<class Observable, class Coordination,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>,
+ is_coordination<Coordination>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Subject = rxsub::synchronize<SourceValue, rxu::decay_t<Coordination>>,
+ class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
+ class Result = connectable_observable<SourceValue, Multicast>
+ >
+ static Result member(Observable&& o, Coordination&& cn, composite_subscription cs = composite_subscription()) {
+ return Result(Multicast(std::forward<Observable>(o), Subject(std::forward<Coordination>(cn), cs)));
+ }
+
+ template<class... AN>
+ static operators::detail::publish_invalid_t<AN...> member(AN...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "publish_synchronized takes (Coordination, optional CompositeSubscription)");
+ }
+};
+
}
#endif
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index c3b7d9d..e8c9b8c 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -203,9 +203,11 @@
#include "operators/rx-ignore_elements.hpp"
#include "operators/rx-map.hpp"
#include "operators/rx-merge.hpp"
+#include "operators/rx-multicast.hpp"
#include "operators/rx-observe_on.hpp"
#include "operators/rx-on_error_resume_next.hpp"
#include "operators/rx-pairwise.hpp"
+#include "operators/rx-publish.hpp"
#include "operators/rx-reduce.hpp"
#include "operators/rx-repeat.hpp"
#include "operators/rx-replay.hpp"
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 0a5ccbe..7a950c5 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -1161,123 +1161,37 @@ public:
return observable_member(ignore_elements_tag{}, *this, std::forward<AN>(an)...);
}
- /// \cond SHOW_SERVICE_MEMBERS
- /// multicast ->
- /// allows connections to the source to be independent of subscriptions
- ///
- template<class Subject>
- auto multicast(Subject sub) const
- -> connectable_observable<T, rxo::detail::multicast<T, this_type, Subject>> {
- return connectable_observable<T, rxo::detail::multicast<T, this_type, Subject>>(
- rxo::detail::multicast<T, this_type, Subject>(*this, std::move(sub)));
- }
- /// \endcond
-
- /*! Turn a cold observable hot and allow connections to the source to be independent of subscriptions.
-
- \tparam Coordination the type of the scheduler
-
- \param cn a scheduler all values are queued and delivered on
- \param cs the subscription to control lifetime
-
- \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers, on the specified scheduler.
-
- \sample
- \snippet publish.cpp publish_synchronized sample
- \snippet output.txt publish_synchronized sample
- */
- template<class Coordination>
- auto publish_synchronized(Coordination cn, composite_subscription cs = composite_subscription()) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> decltype(EXPLICIT_THIS multicast(rxsub::synchronize<T, Coordination>(std::move(cn), cs)))
- /// \endcond
- {
- return multicast(rxsub::synchronize<T, Coordination>(std::move(cn), cs));
- }
-
- /*! Turn a cold observable hot and allow connections to the source to be independent of subscriptions.
-
- \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers.
-
- \sample
- \snippet publish.cpp publish subject sample
- \snippet output.txt publish subject sample
- */
- template<class... AN>
- auto publish(AN**...) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> decltype(EXPLICIT_THIS multicast(rxsub::subject<T>(composite_subscription())))
- /// \endcond
- {
- composite_subscription cs;
- return multicast(rxsub::subject<T>(cs));
- static_assert(sizeof...(AN) == 0, "publish() was passed too many arguments.");
- }
-
- /*! Turn a cold observable hot and allow connections to the source to be independent of subscriptions.
-
- \param cs the subscription to control lifetime
-
- \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers.
-
- \sample
- \snippet publish.cpp publish subject sample
- \snippet output.txt publish subject sample
- */
+ /*! @copydoc rx-muticast.hpp
+ */
template<class... AN>
- auto publish(composite_subscription cs, AN**...) const
+ auto multicast(AN&&... an) const
/// \cond SHOW_SERVICE_MEMBERS
- -> decltype(EXPLICIT_THIS multicast(rxsub::subject<T>(cs)))
+ -> decltype(observable_member(multicast_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
/// \endcond
{
- return multicast(rxsub::subject<T>(cs));
- static_assert(sizeof...(AN) == 0, "publish(composite_subscription) was passed too many arguments.");
+ return observable_member(multicast_tag{}, *this, std::forward<AN>(an)...);
}
- /*! Turn a cold observable hot, send the most recent value to any new subscriber, and allow connections to the source to be independent of subscriptions.
-
- \tparam T the type of the emitted item
-
- \param first an initial item to be emitted by the resulting observable at connection time before emitting the items from the source observable; not emitted to observers that subscribe after the time of connection
-
- \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers.
-
- \sample
- \snippet publish.cpp publish behavior sample
- \snippet output.txt publish behavior sample
- */
+ /*! @copydoc rx-publish.hpp
+ */
template<class... AN>
- auto publish(T first, AN**...) const
+ auto publish(AN&&... an) const
/// \cond SHOW_SERVICE_MEMBERS
- -> decltype(EXPLICIT_THIS multicast(rxsub::behavior<T>(first, composite_subscription())))
+ -> decltype(observable_member(publish_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
/// \endcond
{
- composite_subscription cs;
- return multicast(rxsub::behavior<T>(first, cs));
- static_assert(sizeof...(AN) == 0, "publish(value_type) was passed too many arguments.");
+ return observable_member(publish_tag{}, *this, std::forward<AN>(an)...);
}
- /*! Turn a cold observable hot, send the most recent value to any new subscriber, and allow connections to the source to be independent of subscriptions.
-
- \tparam T the type of the emitted item
-
- \param first an initial item to be emitted by the resulting observable at connection time before emitting the items from the source observable; not emitted to observers that subscribe after the time of connection
- \param cs the subscription to control lifetime
-
- \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers.
-
- \sample
- \snippet publish.cpp publish behavior sample
- \snippet output.txt publish behavior sample
- */
+ /*! @copydoc rxcpp::operators::publish_synchronized
+ */
template<class... AN>
- auto publish(T first, composite_subscription cs, AN**...) const
+ auto publish_synchronized(AN&&... an) const
/// \cond SHOW_SERVICE_MEMBERS
- -> decltype(EXPLICIT_THIS multicast(rxsub::behavior<T>(first, cs)))
+ -> decltype(observable_member(publish_synchronized_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
/// \endcond
{
- return multicast(rxsub::behavior<T>(first, cs));
- static_assert(sizeof...(AN) == 0, "publish(value_type, composite_subscription) was passed too many arguments.");
+ return observable_member(publish_synchronized_tag{}, *this, std::forward<AN>(an)...);
}
/*! @copydoc rx-replay.hpp
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index aeffac9..dda27f7 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -97,8 +97,6 @@ public:
#include "operators/rx-connect_forever.hpp"
#include "operators/rx-lift.hpp"
-#include "operators/rx-multicast.hpp"
-#include "operators/rx-publish.hpp"
#include "operators/rx-ref_count.hpp"
#include "operators/rx-subscribe.hpp"
@@ -256,6 +254,13 @@ struct merge_tag {
};
};
+struct multicast_tag {
+ template<class Included>
+ struct include_header{
+ static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-multicast.hpp>");
+ };
+};
+
struct observe_on_tag {
template<class Included>
struct include_header{
@@ -297,6 +302,14 @@ struct pairwise_tag {
};
};
+struct publish_tag {
+ template<class Included>
+ struct include_header{
+ static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-publish.hpp>");
+ };
+};
+struct publish_synchronized_tag : publish_tag {};
+
struct repeat_tag {
template<class Included>
struct include_header{
@@ -489,4 +502,7 @@ struct zip_tag {
}
+#include "operators/rx-multicast.hpp"
+#include "operators/rx-publish.hpp"
+
#endif
diff --git a/Rx/v2/test/operators/publish.cpp b/Rx/v2/test/operators/publish.cpp
index 62d81ff..316f275 100644
--- a/Rx/v2/test/operators/publish.cpp
+++ b/Rx/v2/test/operators/publish.cpp
@@ -1,4 +1,6 @@
#include "../test.h"
+#include <rxcpp/operators/rx-publish.hpp>
+
SCENARIO("publish range", "[hide][range][subject][publish][subject][operators]"){
GIVEN("a range"){
diff --git a/Rx/v2/test/subscriptions/subscription.cpp b/Rx/v2/test/subscriptions/subscription.cpp
index a4ce52d..fb351bd 100644
--- a/Rx/v2/test/subscriptions/subscription.cpp
+++ b/Rx/v2/test/subscriptions/subscription.cpp
@@ -3,6 +3,7 @@
#include "rxcpp/operators/rx-map.hpp"
#include "rxcpp/operators/rx-take.hpp"
#include "rxcpp/operators/rx-observe_on.hpp"
+#include "rxcpp/operators/rx-publish.hpp"
SCENARIO("observe subscription", "[hide]"){
GIVEN("observable of ints"){