summaryrefslogtreecommitdiff
path: root/Rx
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2017-01-21 01:31:18 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2017-01-21 09:14:18 -0800
commit691498ee639980bff0218681cbf5d1ef2882ee06 (patch)
treee88751f51bca63e73bc5bb0606ba6a7d3fb27564 /Rx
parent1a5d65be7548069c4df12750f5d8f78bb852c6c2 (diff)
downloadRxCpp-691498ee639980bff0218681cbf5d1ef2882ee06.tar.gz
decouple start_with from observable
Diffstat (limited to 'Rx')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-start_with.hpp77
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp1
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp28
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp8
-rw-r--r--Rx/v2/test/CMakeLists.txt1
-rw-r--r--Rx/v2/test/operators/group_by.cpp1
-rw-r--r--Rx/v2/test/operators/start_with.cpp221
7 files changed, 293 insertions, 44 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-start_with.hpp b/Rx/v2/src/rxcpp/operators/rx-start_with.hpp
index 42bf388..8d82e83 100644
--- a/Rx/v2/src/rxcpp/operators/rx-start_with.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-start_with.hpp
@@ -2,10 +2,32 @@
#pragma once
+/*! \file rx-start_with.hpp
+
+ \brief Start with the supplied values, then concatenate this observable.
+
+ \tparam Value0 ...
+ \tparam ValueN the type of sending values
+
+ \param v0 ...
+ \param vn values to send
+
+ \return Observable that emits the specified items and then emits the items emitted by the source observable.
+
+ \sample
+ \snippet start_with.cpp short start_with sample
+ \snippet output.txt short start_with sample
+
+ Another form of this operator, rxcpp::observable<void, void>::start_with, gets the source observable as a parameter:
+ \snippet start_with.cpp full start_with sample
+ \snippet output.txt full start_with sample
+*/
+
#if !defined(RXCPP_OPERATORS_RX_START_WITH_HPP)
#define RXCPP_OPERATORS_RX_START_WITH_HPP
#include "../rx-includes.hpp"
+#include "./rx-concat.hpp"
namespace rxcpp {
@@ -13,34 +35,47 @@ namespace operators {
namespace detail {
-template<class StartObservable>
-class start_with_factory
-{
-public:
- using start_type = rxu::decay_t<StartObservable>;
-
- start_type start;
-
- explicit start_with_factory(start_type s) : start(s) {}
-
- template<class Observable>
- auto operator()(Observable source)
- -> decltype(start.concat(source)) {
- return start.concat(source);
- }
+template<class... AN>
+struct start_with_invalid_arguments {};
+
+template<class... AN>
+struct start_with_invalid : public rxo::operator_base<start_with_invalid_arguments<AN...>> {
+ using type = observable<start_with_invalid_arguments<AN...>, start_with_invalid<AN...>>;
};
-
+template<class... AN>
+using start_with_invalid_t = typename start_with_invalid<AN...>::type;
+
}
-template<class Value0, class... ValueN>
-auto start_with(Value0 v0, ValueN... vn)
- -> detail::start_with_factory<decltype(rxs::from(rxu::decay_t<Value0>(v0), rxu::decay_t<Value0>(vn)...))> {
- return detail::start_with_factory<decltype(rxs::from(rxu::decay_t<Value0>(v0), rxu::decay_t<Value0>(vn)...))>(
- rxs::from(rxu::decay_t<Value0>(v0), rxu::decay_t<Value0>(vn)...));
+/*! @copydoc rx-start_with.hpp
+*/
+template<class... AN>
+auto start_with(AN&&... an)
+ -> operator_factory<start_with_tag, AN...> {
+ return operator_factory<start_with_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}
}
+template<>
+struct member_overload<start_with_tag>
+{
+ template<class Observable, class Value0, class... ValueN,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>>>
+ static auto member(Observable&& o, Value0&& v0, ValueN&&... vn)
+ -> decltype(rxs::from(rxu::decay_t<Value0>(v0), rxu::decay_t<Value0>(vn)...).concat(std::forward<Observable>(o))) {
+ return rxs::from(rxu::decay_t<Value0>(v0), rxu::decay_t<Value0>(vn)...).concat(std::forward<Observable>(o));
+ }
+
+ template<class... AN>
+ static operators::detail::start_with_invalid_t<AN...> member(const AN&...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "start_with takes (Value0, optional ValueN...)");
+ }
+};
+
}
#endif
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index bc2a435..d43dc3b 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -214,6 +214,7 @@
#include "operators/rx-skip.hpp"
#include "operators/rx-skip_last.hpp"
#include "operators/rx-skip_until.hpp"
+#include "operators/rx-start_with.hpp"
#include "operators/rx-switch_if_empty.hpp"
#include "operators/rx-switch_on_next.hpp"
#include "operators/rx-take.hpp"
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 9afceae..2863d43 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -1763,31 +1763,15 @@ public:
return observable_member(retry_tag{}, *this, std::forward<AN>(an)...);
}
- /*! Start with the supplied values, then concatenate this observable.
-
- \tparam Value0 ...
- \tparam ValueN the type of sending values
-
- \param v0 ...
- \param vn values to send
-
- \return Observable that emits the specified items and then emits the items emitted by the source observable.
-
- \sample
- \snippet start_with.cpp short start_with sample
- \snippet output.txt short start_with sample
-
- Another form of this operator, rxcpp::observable<void, void>::start_with, gets the source observable as a parameter:
- \snippet start_with.cpp full start_with sample
- \snippet output.txt full start_with sample
- */
- template<class Value0, class... ValueN>
- auto start_with(Value0 v0, ValueN... vn) const
+ /*! @copydoc rx-start_with.hpp
+ */
+ template<class... AN>
+ auto start_with(AN... an) const
/// \cond SHOW_SERVICE_MEMBERS
- -> decltype(rxo::start_with(std::move(v0), std::move(vn)...)(*(this_type*)nullptr))
+ -> decltype(observable_member(start_with_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
/// \endcond
{
- return rxo::start_with(std::move(v0), std::move(vn)...)(*this);
+ return observable_member(start_with_tag{}, *this, std::forward<AN>(an)...);
}
/*! @copydoc rx-pairwise.hpp
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 9fb23de..30c5e5b 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -102,7 +102,6 @@ public:
#include "operators/rx-publish.hpp"
#include "operators/rx-ref_count.hpp"
#include "operators/rx-replay.hpp"
-#include "operators/rx-start_with.hpp"
#include "operators/rx-subscribe.hpp"
#include "operators/rx-subscribe_on.hpp"
@@ -350,6 +349,13 @@ struct skip_until_tag {
};
};
+struct start_with_tag {
+ template<class Included>
+ struct include_header{
+ static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-start_with.hpp>");
+ };
+};
+
struct switch_if_empty_tag {
template<class Included>
struct include_header{
diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt
index fbbd679..322872c 100644
--- a/Rx/v2/test/CMakeLists.txt
+++ b/Rx/v2/test/CMakeLists.txt
@@ -68,6 +68,7 @@ set(TEST_SOURCES
${TEST_DIR}/operators/skip.cpp
${TEST_DIR}/operators/skip_last.cpp
${TEST_DIR}/operators/skip_until.cpp
+ ${TEST_DIR}/operators/start_with.cpp
${TEST_DIR}/operators/subscribe_on.cpp
${TEST_DIR}/operators/switch_if_empty.cpp
${TEST_DIR}/operators/switch_on_next.cpp
diff --git a/Rx/v2/test/operators/group_by.cpp b/Rx/v2/test/operators/group_by.cpp
index 77ee8f3..2704aa7 100644
--- a/Rx/v2/test/operators/group_by.cpp
+++ b/Rx/v2/test/operators/group_by.cpp
@@ -5,6 +5,7 @@
#include <rxcpp/operators/rx-map.hpp>
#include <rxcpp/operators/rx-merge.hpp>
#include <rxcpp/operators/rx-take.hpp>
+#include <rxcpp/operators/rx-start_with.hpp>
#include <locale>
diff --git a/Rx/v2/test/operators/start_with.cpp b/Rx/v2/test/operators/start_with.cpp
new file mode 100644
index 0000000..ef192b3
--- /dev/null
+++ b/Rx/v2/test/operators/start_with.cpp
@@ -0,0 +1,221 @@
+#include "../test.h"
+#include <rxcpp/operators/rx-start_with.hpp>
+
+SCENARIO("start_with - source never emits or completes", "[start_with][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1)
+ });
+
+ WHEN("start_with one value"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ | rxo::start_with(1)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ | rxo::as_dynamic();
+ }
+ );
+
+ THEN("the output contains start_with value"){
+ auto required = rxu::to_vector({
+ on.next(200, 1)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 1000)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("start_with - source completes without emitting items", "[start_with][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.completed(250)
+ });
+
+ WHEN("start_with one value"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .start_with(5)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains start_with item and complete message"){
+ auto required = rxu::to_vector({
+ on.next(200, 5),
+ on.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("start_with - source emits and completes", "[start_with][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.completed(250)
+ });
+
+ WHEN("start_with one value"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .start_with(5)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains start_with item and the items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(200, 5),
+ on.next(210, 2),
+ on.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("start_with - sources terminates with an error", "[start_with][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ std::runtime_error ex("start_with on_error from source");
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.error(250, ex)
+ });
+
+ WHEN("start_with one value"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .start_with(5)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains start_with item and an error"){
+ auto required = rxu::to_vector({
+ on.next(200, 5),
+ on.error(250, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("start_with several items - source emits and completes", "[start_with][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.completed(250)
+ });
+
+ WHEN("start_with one value"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .start_with(5, 6)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains start_with item and the items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(200, 5),
+ on.next(200, 6),
+ on.next(210, 2),
+ on.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}