diff options
author | Grigoriy Chudnov <g.chudnov@gmail.com> | 2016-05-25 17:44:55 +0300 |
---|---|---|
committer | Grigoriy Chudnov <g.chudnov@gmail.com> | 2016-05-25 17:44:55 +0300 |
commit | 58339cc9e57c382c721586adf998d8a72ada27e1 (patch) | |
tree | a08fc82c8eedabab13443c77b783aade66b5b3c4 | |
parent | eb774e726d9b9838190f21c6fbaf55059b5e550d (diff) | |
download | RxCpp-58339cc9e57c382c721586adf998d8a72ada27e1.tar.gz |
add skip_last operator
-rw-r--r-- | Rx/v2/examples/doxygen/skip_last.cpp | 14 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-skip_last.hpp | 119 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 22 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/CMakeLists.txt | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/skip_last.cpp | 278 | ||||
-rw-r--r-- | projects/CMake/CMakeLists.txt | 1 | ||||
-rw-r--r-- | projects/doxygen/CMakeLists.txt | 1 |
8 files changed, 437 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/skip_last.cpp b/Rx/v2/examples/doxygen/skip_last.cpp new file mode 100644 index 0000000..29d6aad --- /dev/null +++ b/Rx/v2/examples/doxygen/skip_last.cpp @@ -0,0 +1,14 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("skip_last sample"){ + printf("//! [skip_last sample]\n"); + auto values = rxcpp::observable<>::range(1, 7).skip_last(3); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [skip_last sample]\n"); +} diff --git a/Rx/v2/src/rxcpp/operators/rx-skip_last.hpp b/Rx/v2/src/rxcpp/operators/rx-skip_last.hpp new file mode 100644 index 0000000..1a9681a --- /dev/null +++ b/Rx/v2/src/rxcpp/operators/rx-skip_last.hpp @@ -0,0 +1,119 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +#if !defined(RXCPP_OPERATORS_RX_SKIP_LAST_HPP) +#define RXCPP_OPERATORS_RX_SKIP_LAST_HPP + +#include "../rx-includes.hpp" + +namespace rxcpp { + +namespace operators { + +namespace detail { + +template<class T, class Observable, class Count> +struct skip_last : public operator_base<T> +{ + typedef rxu::decay_t<Observable> source_type; + typedef rxu::decay_t<Count> count_type; + + typedef std::queue<T> queue_type; + typedef typename queue_type::size_type queue_size_type; + + struct values + { + values(source_type s, count_type t) + : source(std::move(s)) + , count(static_cast<queue_size_type>(t)) + { + } + source_type source; + queue_size_type count; + }; + values initial; + + skip_last(source_type s, count_type t) + : initial(std::move(s), std::move(t)) + { + } + + template<class Subscriber> + void on_subscribe(const Subscriber& s) const { + + typedef Subscriber output_type; + struct state_type + : public std::enable_shared_from_this<state_type> + , public values + { + state_type(const values& i, const output_type& oarg) + : values(i) + , out(oarg) + { + } + queue_type items; + output_type out; + }; + // take a copy of the values for each subscription + auto state = std::make_shared<state_type>(initial, s); + + composite_subscription source_lifetime; + + s.add(source_lifetime); + + state->source.subscribe( + // split subscription lifetime + source_lifetime, + // on_next + [state](T t) { + if(state->count > 0) { + if (state->items.size() == state->count) { + state->out.on_next(std::move(state->items.front())); + state->items.pop(); + } + state->items.push(t); + } else { + state->out.on_next(t); + } + }, + // on_error + [state](std::exception_ptr e) { + state->out.on_error(e); + }, + // on_completed + [state]() { + state->out.on_completed(); + } + ); + } +}; + +template<class T> +class skip_last_factory +{ + typedef rxu::decay_t<T> count_type; + count_type count; +public: + skip_last_factory(count_type t) : count(std::move(t)) {} + template<class Observable> + auto operator()(Observable&& source) + -> observable<rxu::value_type_t<rxu::decay_t<Observable>>, skip_last<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, count_type>> { + return observable<rxu::value_type_t<rxu::decay_t<Observable>>, skip_last<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, count_type>>( + skip_last<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, count_type>(std::forward<Observable>(source), count)); + } +}; + +} + +template<class T> +auto skip_last(T&& t) + -> detail::skip_last_factory<T> { + return detail::skip_last_factory<T>(std::forward<T>(t)); +} + +} + +} + +#endif diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index e3dca0e..f78ada4 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -3035,6 +3035,28 @@ public: rxo::detail::skip<T, this_type, Count>(*this, t)); } + /*! Make new observable with skipped last count items from this observable. + + \tparam Count the type of the items counter + + \param t the number of last items to skip + + \return An observable that is identical to the source observable except that it does not emit the last t items that the source observable emits. + + \sample + \snippet skip_last.cpp skip sample + \snippet output.txt skip sample + */ + template<class Count> + auto skip_last(Count t) const + /// \cond SHOW_SERVICE_MEMBERS + -> observable<T, rxo::detail::skip_last<T, this_type, Count>> + /// \endcond + { + return observable<T, rxo::detail::skip_last<T, this_type, Count>>( + rxo::detail::skip_last<T, this_type, Count>(*this, t)); + } + /*! Make new observable with items skipped until on_next occurs on the trigger observable \tparam TriggerSource the type of the trigger observable diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 3b792a2..a8d643a 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -70,6 +70,7 @@ namespace rxo=operators; #include "operators/rx-sample_time.hpp" #include "operators/rx-scan.hpp" #include "operators/rx-skip.hpp" +#include "operators/rx-skip_last.hpp" #include "operators/rx-skip_until.hpp" #include "operators/rx-start_with.hpp" #include "operators/rx-subscribe.hpp" diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt index d8f9033..780884b 100644 --- a/Rx/v2/test/CMakeLists.txt +++ b/Rx/v2/test/CMakeLists.txt @@ -59,6 +59,7 @@ set(TEST_SOURCES ${TEST_DIR}/operators/sample.cpp ${TEST_DIR}/operators/scan.cpp ${TEST_DIR}/operators/skip.cpp + ${TEST_DIR}/operators/skip_last.cpp ${TEST_DIR}/operators/skip_until.cpp ${TEST_DIR}/operators/subscribe_on.cpp ${TEST_DIR}/operators/switch_on_next.cpp diff --git a/Rx/v2/test/operators/skip_last.cpp b/Rx/v2/test/operators/skip_last.cpp new file mode 100644 index 0000000..f67e965 --- /dev/null +++ b/Rx/v2/test/operators/skip_last.cpp @@ -0,0 +1,278 @@ +#include "../test.h" + +SCENARIO("skip last 0", "[skip_last][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), + on.next(220, 3), + on.next(230, 4), + on.next(240, 5), + on.completed(250) + }); + + WHEN("0 last values are skipped"){ + + auto res = w.start( + [xs]() { + return xs + .skip_last(0) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output only contains the completion event"){ + auto required = rxu::to_vector({ + on.next(210, 2), + on.next(220, 3), + on.next(230, 4), + on.next(240, 5), + on.completed(250) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("skip last 1", "[skip_last][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), + on.next(220, 3), + on.next(230, 4), + on.next(240, 5), + on.completed(250) + }); + + WHEN("1 last value is skipped"){ + + auto res = w.start( + [xs]() { + return xs + .skip_last(1) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output only contains items sent while subscribed"){ + auto required = rxu::to_vector({ + on.next(220, 2), + on.next(230, 3), + on.next(240, 4), + on.completed(250) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("skip last 2", "[skip_last][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), + on.next(220, 3), + on.next(230, 4), + on.next(240, 5), + on.completed(250) + }); + + WHEN("2 last values are skipped"){ + + auto res = w.start( + [xs]() { + return xs + .skip_last(2) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output only contains items sent while subscribed"){ + auto required = rxu::to_vector({ + on.next(230, 2), + on.next(240, 3), + on.completed(250) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("skip last 10, complete before all elements are skipped", "[skip_last][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), + on.next(220, 3), + on.next(230, 4), + on.next(240, 5), + on.completed(250) + }); + + WHEN("10 last values are skipped"){ + + auto res = w.start( + [xs]() { + return xs + .skip_last(10) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output only contains items sent while subscribed"){ + auto required = rxu::to_vector({ + on.completed(250) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("no items to skip_last", "[skip_last][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1) + }); + + WHEN("2 last values are skipped"){ + + auto res = w.start( + [so, xs]() { + return xs + .skip_last(2) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output is empty"){ + auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 1000) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("skip_last, source observable emits an error", "[skip_last][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + std::runtime_error ex("on_error from source"); + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.error(250, ex) + }); + + WHEN("2 last values are skipped"){ + + auto res = w.start( + [so, xs]() { + return xs + .skip_last(2) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains only an error message"){ + auto required = rxu::to_vector({ + on.error(250, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} diff --git a/projects/CMake/CMakeLists.txt b/projects/CMake/CMakeLists.txt index 5e8e0ab..1e39f46 100644 --- a/projects/CMake/CMakeLists.txt +++ b/projects/CMake/CMakeLists.txt @@ -62,6 +62,7 @@ set(RX_SOURCES ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-sample_time.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-scan.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-skip.hpp + ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-skip_last.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-skip_until.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-start_with.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-subscribe.hpp diff --git a/projects/doxygen/CMakeLists.txt b/projects/doxygen/CMakeLists.txt index eeae484..7f8f749 100644 --- a/projects/doxygen/CMakeLists.txt +++ b/projects/doxygen/CMakeLists.txt @@ -88,6 +88,7 @@ if(DOXYGEN_FOUND) ${DOXY_EXAMPLES_SRC_DIR}/scan.cpp ${DOXY_EXAMPLES_SRC_DIR}/scope.cpp ${DOXY_EXAMPLES_SRC_DIR}/skip.cpp + ${DOXY_EXAMPLES_SRC_DIR}/skip_last.cpp ${DOXY_EXAMPLES_SRC_DIR}/skip_until.cpp ${DOXY_EXAMPLES_SRC_DIR}/start_with.cpp ${DOXY_EXAMPLES_SRC_DIR}/subscribe.cpp |