summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2016-05-25 17:44:55 +0300
committerGrigoriy Chudnov <g.chudnov@gmail.com>2016-05-25 17:44:55 +0300
commit58339cc9e57c382c721586adf998d8a72ada27e1 (patch)
treea08fc82c8eedabab13443c77b783aade66b5b3c4
parenteb774e726d9b9838190f21c6fbaf55059b5e550d (diff)
downloadRxCpp-58339cc9e57c382c721586adf998d8a72ada27e1.tar.gz
add skip_last operator
-rw-r--r--Rx/v2/examples/doxygen/skip_last.cpp14
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-skip_last.hpp119
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp22
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp1
-rw-r--r--Rx/v2/test/CMakeLists.txt1
-rw-r--r--Rx/v2/test/operators/skip_last.cpp278
-rw-r--r--projects/CMake/CMakeLists.txt1
-rw-r--r--projects/doxygen/CMakeLists.txt1
8 files changed, 437 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/skip_last.cpp b/Rx/v2/examples/doxygen/skip_last.cpp
new file mode 100644
index 0000000..29d6aad
--- /dev/null
+++ b/Rx/v2/examples/doxygen/skip_last.cpp
@@ -0,0 +1,14 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("skip_last sample"){
+ printf("//! [skip_last sample]\n");
+ auto values = rxcpp::observable<>::range(1, 7).skip_last(3);
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [skip_last sample]\n");
+}
diff --git a/Rx/v2/src/rxcpp/operators/rx-skip_last.hpp b/Rx/v2/src/rxcpp/operators/rx-skip_last.hpp
new file mode 100644
index 0000000..1a9681a
--- /dev/null
+++ b/Rx/v2/src/rxcpp/operators/rx-skip_last.hpp
@@ -0,0 +1,119 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+#if !defined(RXCPP_OPERATORS_RX_SKIP_LAST_HPP)
+#define RXCPP_OPERATORS_RX_SKIP_LAST_HPP
+
+#include "../rx-includes.hpp"
+
+namespace rxcpp {
+
+namespace operators {
+
+namespace detail {
+
+template<class T, class Observable, class Count>
+struct skip_last : public operator_base<T>
+{
+ typedef rxu::decay_t<Observable> source_type;
+ typedef rxu::decay_t<Count> count_type;
+
+ typedef std::queue<T> queue_type;
+ typedef typename queue_type::size_type queue_size_type;
+
+ struct values
+ {
+ values(source_type s, count_type t)
+ : source(std::move(s))
+ , count(static_cast<queue_size_type>(t))
+ {
+ }
+ source_type source;
+ queue_size_type count;
+ };
+ values initial;
+
+ skip_last(source_type s, count_type t)
+ : initial(std::move(s), std::move(t))
+ {
+ }
+
+ template<class Subscriber>
+ void on_subscribe(const Subscriber& s) const {
+
+ typedef Subscriber output_type;
+ struct state_type
+ : public std::enable_shared_from_this<state_type>
+ , public values
+ {
+ state_type(const values& i, const output_type& oarg)
+ : values(i)
+ , out(oarg)
+ {
+ }
+ queue_type items;
+ output_type out;
+ };
+ // take a copy of the values for each subscription
+ auto state = std::make_shared<state_type>(initial, s);
+
+ composite_subscription source_lifetime;
+
+ s.add(source_lifetime);
+
+ state->source.subscribe(
+ // split subscription lifetime
+ source_lifetime,
+ // on_next
+ [state](T t) {
+ if(state->count > 0) {
+ if (state->items.size() == state->count) {
+ state->out.on_next(std::move(state->items.front()));
+ state->items.pop();
+ }
+ state->items.push(t);
+ } else {
+ state->out.on_next(t);
+ }
+ },
+ // on_error
+ [state](std::exception_ptr e) {
+ state->out.on_error(e);
+ },
+ // on_completed
+ [state]() {
+ state->out.on_completed();
+ }
+ );
+ }
+};
+
+template<class T>
+class skip_last_factory
+{
+ typedef rxu::decay_t<T> count_type;
+ count_type count;
+public:
+ skip_last_factory(count_type t) : count(std::move(t)) {}
+ template<class Observable>
+ auto operator()(Observable&& source)
+ -> observable<rxu::value_type_t<rxu::decay_t<Observable>>, skip_last<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, count_type>> {
+ return observable<rxu::value_type_t<rxu::decay_t<Observable>>, skip_last<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, count_type>>(
+ skip_last<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, count_type>(std::forward<Observable>(source), count));
+ }
+};
+
+}
+
+template<class T>
+auto skip_last(T&& t)
+ -> detail::skip_last_factory<T> {
+ return detail::skip_last_factory<T>(std::forward<T>(t));
+}
+
+}
+
+}
+
+#endif
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index e3dca0e..f78ada4 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -3035,6 +3035,28 @@ public:
rxo::detail::skip<T, this_type, Count>(*this, t));
}
+ /*! Make new observable with skipped last count items from this observable.
+
+ \tparam Count the type of the items counter
+
+ \param t the number of last items to skip
+
+ \return An observable that is identical to the source observable except that it does not emit the last t items that the source observable emits.
+
+ \sample
+ \snippet skip_last.cpp skip sample
+ \snippet output.txt skip sample
+ */
+ template<class Count>
+ auto skip_last(Count t) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> observable<T, rxo::detail::skip_last<T, this_type, Count>>
+ /// \endcond
+ {
+ return observable<T, rxo::detail::skip_last<T, this_type, Count>>(
+ rxo::detail::skip_last<T, this_type, Count>(*this, t));
+ }
+
/*! Make new observable with items skipped until on_next occurs on the trigger observable
\tparam TriggerSource the type of the trigger observable
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 3b792a2..a8d643a 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -70,6 +70,7 @@ namespace rxo=operators;
#include "operators/rx-sample_time.hpp"
#include "operators/rx-scan.hpp"
#include "operators/rx-skip.hpp"
+#include "operators/rx-skip_last.hpp"
#include "operators/rx-skip_until.hpp"
#include "operators/rx-start_with.hpp"
#include "operators/rx-subscribe.hpp"
diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt
index d8f9033..780884b 100644
--- a/Rx/v2/test/CMakeLists.txt
+++ b/Rx/v2/test/CMakeLists.txt
@@ -59,6 +59,7 @@ set(TEST_SOURCES
${TEST_DIR}/operators/sample.cpp
${TEST_DIR}/operators/scan.cpp
${TEST_DIR}/operators/skip.cpp
+ ${TEST_DIR}/operators/skip_last.cpp
${TEST_DIR}/operators/skip_until.cpp
${TEST_DIR}/operators/subscribe_on.cpp
${TEST_DIR}/operators/switch_on_next.cpp
diff --git a/Rx/v2/test/operators/skip_last.cpp b/Rx/v2/test/operators/skip_last.cpp
new file mode 100644
index 0000000..f67e965
--- /dev/null
+++ b/Rx/v2/test/operators/skip_last.cpp
@@ -0,0 +1,278 @@
+#include "../test.h"
+
+SCENARIO("skip last 0", "[skip_last][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(220, 3),
+ on.next(230, 4),
+ on.next(240, 5),
+ on.completed(250)
+ });
+
+ WHEN("0 last values are skipped"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .skip_last(0)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output only contains the completion event"){
+ auto required = rxu::to_vector({
+ on.next(210, 2),
+ on.next(220, 3),
+ on.next(230, 4),
+ on.next(240, 5),
+ on.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("skip last 1", "[skip_last][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(220, 3),
+ on.next(230, 4),
+ on.next(240, 5),
+ on.completed(250)
+ });
+
+ WHEN("1 last value is skipped"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .skip_last(1)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output only contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(220, 2),
+ on.next(230, 3),
+ on.next(240, 4),
+ on.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("skip last 2", "[skip_last][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(220, 3),
+ on.next(230, 4),
+ on.next(240, 5),
+ on.completed(250)
+ });
+
+ WHEN("2 last values are skipped"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .skip_last(2)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output only contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(230, 2),
+ on.next(240, 3),
+ on.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("skip last 10, complete before all elements are skipped", "[skip_last][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(220, 3),
+ on.next(230, 4),
+ on.next(240, 5),
+ on.completed(250)
+ });
+
+ WHEN("10 last values are skipped"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .skip_last(10)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output only contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("no items to skip_last", "[skip_last][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1)
+ });
+
+ WHEN("2 last values are skipped"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs
+ .skip_last(2)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output is empty"){
+ auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 1000)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("skip_last, source observable emits an error", "[skip_last][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ std::runtime_error ex("on_error from source");
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.error(250, ex)
+ });
+
+ WHEN("2 last values are skipped"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs
+ .skip_last(2)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains only an error message"){
+ auto required = rxu::to_vector({
+ on.error(250, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
diff --git a/projects/CMake/CMakeLists.txt b/projects/CMake/CMakeLists.txt
index 5e8e0ab..1e39f46 100644
--- a/projects/CMake/CMakeLists.txt
+++ b/projects/CMake/CMakeLists.txt
@@ -62,6 +62,7 @@ set(RX_SOURCES
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-sample_time.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-scan.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-skip.hpp
+ ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-skip_last.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-skip_until.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-start_with.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-subscribe.hpp
diff --git a/projects/doxygen/CMakeLists.txt b/projects/doxygen/CMakeLists.txt
index eeae484..7f8f749 100644
--- a/projects/doxygen/CMakeLists.txt
+++ b/projects/doxygen/CMakeLists.txt
@@ -88,6 +88,7 @@ if(DOXYGEN_FOUND)
${DOXY_EXAMPLES_SRC_DIR}/scan.cpp
${DOXY_EXAMPLES_SRC_DIR}/scope.cpp
${DOXY_EXAMPLES_SRC_DIR}/skip.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/skip_last.cpp
${DOXY_EXAMPLES_SRC_DIR}/skip_until.cpp
${DOXY_EXAMPLES_SRC_DIR}/start_with.cpp
${DOXY_EXAMPLES_SRC_DIR}/subscribe.cpp