summaryrefslogtreecommitdiff
path: root/Rx/v2
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2016-11-30 12:23:04 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2016-11-30 16:47:50 -0800
commit1dd48669dacc5edafb64bb29afdf5fe22d2c0c6a (patch)
tree816433fa7086482ee7f796b8ddfda162de86f2f1 /Rx/v2
parent546ac574e2e9910b72d14e867f724d114d14184f (diff)
downloadRxCpp-1dd48669dacc5edafb64bb29afdf5fe22d2c0c6a.tar.gz
decouple finally from observable
Diffstat (limited to 'Rx/v2')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-finally.hpp76
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp1
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp26
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp8
-rw-r--r--Rx/v2/test/CMakeLists.txt1
-rw-r--r--Rx/v2/test/operators/finally.cpp200
6 files changed, 272 insertions, 40 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-finally.hpp b/Rx/v2/src/rxcpp/operators/rx-finally.hpp
index 45cbf89..4e4416c 100644
--- a/Rx/v2/src/rxcpp/operators/rx-finally.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-finally.hpp
@@ -2,6 +2,25 @@
#pragma once
+/*! \file rx-finally.hpp
+
+ \brief Add a new action at the end of the new observable that is returned.
+
+ \tparam LastCall the type of the action function
+
+ \param lc the action function
+
+ \return Observable that emits the same items as the source observable, then invokes the given action.
+
+ \sample
+ \snippet finally.cpp finally sample
+ \snippet output.txt finally sample
+
+ If the source observable generates an error, the final action is still being called:
+ \snippet finally.cpp error finally sample
+ \snippet output.txt error finally sample
+*/
+
#if !defined(RXCPP_OPERATORS_RX_FINALLY_HPP)
#define RXCPP_OPERATORS_RX_FINALLY_HPP
@@ -13,6 +32,16 @@ namespace operators {
namespace detail {
+template<class... AN>
+struct finally_invalid_arguments {};
+
+template<class... AN>
+struct finally_invalid : public rxo::operator_base<finally_invalid_arguments<AN...>> {
+ using type = observable<finally_invalid_arguments<AN...>, finally_invalid<AN...>>;
+};
+template<class... AN>
+using finally_invalid_t = typename finally_invalid<AN...>::type;
+
template<class T, class LastCall>
struct finally
{
@@ -48,7 +77,7 @@ struct finally
dest.on_completed();
}
- static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, const last_call_type& lc) {
+ static subscriber<value_type, observer_type> make(dest_type d, const last_call_type& lc) {
auto dl = d.get_subscription();
composite_subscription cs;
dl.add(cs);
@@ -67,30 +96,39 @@ struct finally
}
};
-template<class LastCall>
-class finally_factory
-{
- typedef rxu::decay_t<LastCall> last_call_type;
- last_call_type last_call;
-public:
- finally_factory(last_call_type lc) : last_call(std::move(lc)) {}
- template<class Observable>
- auto operator()(Observable&& source)
- -> decltype(source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(finally<rxu::value_type_t<rxu::decay_t<Observable>>, last_call_type>(last_call))) {
- return source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(finally<rxu::value_type_t<rxu::decay_t<Observable>>, last_call_type>(last_call));
- }
-};
-
}
-template<class LastCall>
-auto finally(LastCall lc)
- -> detail::finally_factory<LastCall> {
- return detail::finally_factory<LastCall>(std::move(lc));
+/*! @copydoc rx-finally.hpp
+*/
+template<class... AN>
+auto finally(AN&&... an)
+ -> operator_factory<finally_tag, AN...> {
+ return operator_factory<finally_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}
}
+template<>
+struct member_overload<finally_tag>
+{
+ template<class Observable, class LastCall,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>>,
+ class Finally = rxo::detail::finally<SourceValue, rxu::decay_t<LastCall>>>
+ static auto member(Observable&& o, LastCall&& lc)
+ -> decltype(o.template lift<SourceValue>(Finally(std::forward<LastCall>(lc)))) {
+ return o.template lift<SourceValue>(Finally(std::forward<LastCall>(lc)));
+ }
+
+ template<class... AN>
+ static operators::detail::finally_invalid_t<AN...> member(const AN&...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "finally takes (LastCall)");
+ }
+};
+
}
#endif
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index 4d7edb1..0089085 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -191,6 +191,7 @@
#include "operators/rx-distinct_until_changed.hpp"
#include "operators/rx-element_at.hpp"
#include "operators/rx-filter.hpp"
+#include "operators/rx-finally.hpp"
#include "operators/rx-group_by.hpp"
#include "operators/rx-ignore_elements.hpp"
#include "operators/rx-reduce.hpp"
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index a48159b..8c3fb70 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -1070,29 +1070,15 @@ public:
static_assert(sizeof...(AN) == 0, "timestamp() was passed too many arguments.");
}
- /*! Add a new action at the end of the new observable that is returned.
-
- \tparam LastCall the type of the action function
-
- \param lc the action function
-
- \return Observable that emits the same items as the source observable, then invokes the given action.
-
- \sample
- \snippet finally.cpp finally sample
- \snippet output.txt finally sample
-
- If the source observable generates an error, the final action is still being called:
- \snippet finally.cpp error finally sample
- \snippet output.txt error finally sample
- */
- template<class LastCall>
- auto finally(LastCall lc) const
+ /*! @copydoc rx-finally.hpp
+ */
+ template<class... AN>
+ auto finally(AN&&... an) const
/// \cond SHOW_SERVICE_MEMBERS
- -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::finally<T, LastCall>(std::move(lc))))
+ -> decltype(observable_member(finally_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
/// \endcond
{
- return lift<T>(rxo::detail::finally<T, LastCall>(std::move(lc)));
+ return observable_member(finally_tag{}, *this, std::forward<AN>(an)...);
}
/*! If an error occurs, take the result from the Selector and subscribe to that instead.
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index db25d23..f92de1b 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -102,7 +102,6 @@ public:
#include "operators/rx-concat.hpp"
#include "operators/rx-concat_map.hpp"
#include "operators/rx-connect_forever.hpp"
-#include "operators/rx-finally.hpp"
#include "operators/rx-flat_map.hpp"
#include "operators/rx-lift.hpp"
#include "operators/rx-map.hpp"
@@ -207,6 +206,13 @@ struct filter_tag {
};
};
+struct finally_tag {
+ template<class Included>
+ struct include_header{
+ static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-finally.hpp>");
+ };
+};
+
struct group_by_tag {
template<class Included>
struct include_header{
diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt
index 5b5aa38..55d1b84 100644
--- a/Rx/v2/test/CMakeLists.txt
+++ b/Rx/v2/test/CMakeLists.txt
@@ -44,6 +44,7 @@ set(TEST_SOURCES
${TEST_DIR}/operators/element_at.cpp
${TEST_DIR}/operators/exists.cpp
${TEST_DIR}/operators/filter.cpp
+ ${TEST_DIR}/operators/finally.cpp
${TEST_DIR}/operators/flat_map.cpp
${TEST_DIR}/operators/group_by.cpp
${TEST_DIR}/operators/ignore_elements.cpp
diff --git a/Rx/v2/test/operators/finally.cpp b/Rx/v2/test/operators/finally.cpp
new file mode 100644
index 0000000..eb7046c
--- /dev/null
+++ b/Rx/v2/test/operators/finally.cpp
@@ -0,0 +1,200 @@
+#include "../test.h"
+#include <rxcpp/operators/rx-finally.hpp>
+
+SCENARIO("finally - never", "[finally][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ long invoked = 0;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1)
+ });
+
+ WHEN("finally action is set"){
+
+ auto res = w.start(
+ [xs, &invoked]() {
+ return xs
+ | rxo::finally([&invoked]() {
+ ++invoked;
+ })
+ // forget type to workaround lambda deduction bug on msvc 2013
+ | rxo::as_dynamic();
+ }
+ );
+
+ THEN("finally called once"){
+ REQUIRE(1 == invoked);
+ }
+
+ THEN("the output is empty"){
+ auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 1000)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("finally - empty", "[finally][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ long invoked = 0;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.completed(250)
+ });
+
+ WHEN("finally action is set"){
+
+ auto res = w.start(
+ [xs, &invoked]() {
+ return xs
+ .finally([&invoked]() {
+ ++invoked;
+ })
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("finally called once"){
+ REQUIRE(1 == invoked);
+ }
+
+ THEN("the output only contains complete message"){
+ auto required = rxu::to_vector({
+ on.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("finally - items emitted", "[finally][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ long invoked = 0;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(240, 3),
+ on.completed(300)
+ });
+
+ WHEN("finally action is set"){
+
+ auto res = w.start(
+ [xs, &invoked]() {
+ return xs
+ .finally([&invoked]() {
+ ++invoked;
+ })
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("finally called once"){
+ REQUIRE(1 == invoked);
+ }
+
+ THEN("the output only contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(210, 2),
+ on.next(240, 3),
+ on.completed(300)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 300)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("finally - throw", "[finally][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ long invoked = 0;
+
+ std::runtime_error ex("finally on_error from source");
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.error(250, ex)
+ });
+
+ WHEN("finally action is set"){
+
+ auto res = w.start(
+ [xs, &invoked]() {
+ return xs
+ .finally([&invoked]() {
+ ++invoked;
+ })
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("finally called once"){
+ REQUIRE(1 == invoked);
+ }
+
+ THEN("the output only contains only error"){
+ auto required = rxu::to_vector({
+ on.error(250, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}