diff options
-rw-r--r-- | Rx/v2/examples/doxygen/element_at.cpp | 14 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-element-at.hpp | 103 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 18 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/CMakeLists.txt | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/element_at.cpp | 292 | ||||
-rw-r--r-- | projects/CMake/CMakeLists.txt | 1 |
7 files changed, 430 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/element_at.cpp b/Rx/v2/examples/doxygen/element_at.cpp new file mode 100644 index 0000000..0b52776 --- /dev/null +++ b/Rx/v2/examples/doxygen/element_at.cpp @@ -0,0 +1,14 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("element_at sample"){ + printf("//! [element_at sample]\n"); + auto values = rxcpp::observable<>::range(1, 7).element_at(3); + values. + subscribe( + [](int v){printf("OnNext: %d\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [element_at sample]\n"); +} diff --git a/Rx/v2/src/rxcpp/operators/rx-element-at.hpp b/Rx/v2/src/rxcpp/operators/rx-element-at.hpp new file mode 100644 index 0000000..a894598 --- /dev/null +++ b/Rx/v2/src/rxcpp/operators/rx-element-at.hpp @@ -0,0 +1,103 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +#if !defined(RXCPP_OPERATORS_RX_ELEMENT_AT_HPP) +#define RXCPP_OPERATORS_RX_ELEMENT_AT_HPP + +#include "../rx-includes.hpp" + +namespace rxcpp { + +namespace operators { + +namespace detail { + +template<class T> +struct element_at { + typedef rxu::decay_t<T> source_value_type; + + struct element_at_values { + element_at_values(int i) + : index(i) + { + } + int index; + }; + + element_at_values initial; + + element_at(int i) + : initial(i) + { + } + + template<class Subscriber> + struct element_at_observer : public element_at_values + { + typedef element_at_observer<Subscriber> this_type; + typedef source_value_type value_type; + typedef rxu::decay_t<Subscriber> dest_type; + typedef observer<value_type, this_type> observer_type; + dest_type dest; + mutable int current; + + element_at_observer(dest_type d, element_at_values v) + : element_at_values(v), + dest(d), + current(0) + { + } + void on_next(source_value_type v) const { + if (current++ == this->index) { + dest.on_next(v); + dest.on_completed(); + } + } + void on_error(std::exception_ptr e) const { + dest.on_error(e); + } + void on_completed() const { + if(current <= this->index) { + dest.on_error(std::make_exception_ptr(std::range_error("index is out of bounds"))); + } + } + + static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, element_at_values v) { + return make_subscriber<value_type>(d, this_type(d, v)); + } + }; + + template<class Subscriber> + auto operator()(Subscriber dest) const + -> decltype(element_at_observer<Subscriber>::make(std::move(dest), initial)) { + return element_at_observer<Subscriber>::make(std::move(dest), initial); + } +}; + +class element_at_factory +{ + int index; +public: + element_at_factory(int i) : index(i) {} + + template<class Observable> + auto operator()(Observable&& source) + -> decltype(source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(element_at<rxu::value_type_t<rxu::decay_t<Observable>>>(index))) { + return source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(element_at<rxu::value_type_t<rxu::decay_t<Observable>>>(index)); + } +}; + +} + + +inline auto element_at(int index) + -> detail::element_at_factory { + return detail::element_at_factory(index); +} + +} + +} + +#endif diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 802b637..fe5f2d8 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -870,6 +870,24 @@ public: return lift<T>(rxo::detail::distinct_until_changed<T>()); } + /*! Pulls an item located at a specified index location in the sequence of items and emits that item as its own sole emission. + + \param index the index of the element to return. + + \return An observable that emit an item located at a specified index location. + + \sample + \snippet element_at.cpp element_at sample + \snippet output.txt element_at sample + */ + auto element_at(int index) const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::element_at<T>(index))) + /// \endcond + { + return lift<T>(rxo::detail::element_at<T>(index)); + } + /*! Rerurn an observable that emits connected, non-overlapping windows, each containing at most count items from the source observable. \param count the maximum size of each window before it should be completed diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 914b94a..689c8d1 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -44,6 +44,7 @@ namespace rxo=operators; #include "operators/rx-connect_forever.hpp" #include "operators/rx-distinct.hpp" #include "operators/rx-distinct_until_changed.hpp" +#include "operators/rx-element-at.hpp" #include "operators/rx-filter.hpp" #include "operators/rx-finally.hpp" #include "operators/rx-flat_map.hpp" diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt index 3f08d74..9fd13fd 100644 --- a/Rx/v2/test/CMakeLists.txt +++ b/Rx/v2/test/CMakeLists.txt @@ -35,6 +35,7 @@ set(TEST_SOURCES ${TEST_DIR}/operators/concat_map.cpp ${TEST_DIR}/operators/distinct.cpp ${TEST_DIR}/operators/distinct_until_changed.cpp + ${TEST_DIR}/operators/element_at.cpp ${TEST_DIR}/operators/filter.cpp ${TEST_DIR}/operators/flat_map.cpp ${TEST_DIR}/operators/group_by.cpp diff --git a/Rx/v2/test/operators/element_at.cpp b/Rx/v2/test/operators/element_at.cpp new file mode 100644 index 0000000..06c3be1 --- /dev/null +++ b/Rx/v2/test/operators/element_at.cpp @@ -0,0 +1,292 @@ +#include "../test.h" + +SCENARIO("element_at - never", "[element_at][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1) + }); + + WHEN("element_at is taken"){ + + auto res = w.start( + [xs]() { + return xs.element_at(3); + } + ); + + THEN("the output is empty"){ + auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 1000) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("element_at - empty", "[element_at][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + std::runtime_error ex("element_at on_error from source"); + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.completed(250) + }); + + WHEN("element_at is taken"){ + + auto res = w.start( + [xs]() { + return xs.element_at(0); + } + ); + + THEN("the output only contains an error"){ + auto required = rxu::to_vector({ + on.error(250, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("element_at - first", "[element_at][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), + on.completed(250) + }); + + WHEN("element_at is taken"){ + + auto res = w.start( + [xs]() { + return xs.element_at(0); + } + ); + + THEN("the output contains the first element"){ + auto required = rxu::to_vector({ + on.next(210, 2), + on.completed(210) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 210) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("element_at - throw", "[element_at][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + std::runtime_error ex("element_at on_error from source"); + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.error(250, ex) + }); + + WHEN("element_at is taken"){ + + auto res = w.start( + [xs]() { + return xs.element_at(3); + } + ); + + THEN("the output contains an error"){ + auto required = rxu::to_vector({ + on.error(250, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("element_at - non-first", "[element_at][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), + on.next(220, 3), + on.next(230, 4), // + on.next(240, 5), + on.completed(250) + }); + + WHEN("element_at is taken"){ + + auto res = w.start( + [xs]() { + return xs.element_at(2); + } + ); + + THEN("the output contains the element at requested index"){ + auto required = rxu::to_vector({ + on.next(230, 4), + on.completed(230) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 230) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("element_at - last in a sequence", "[element_at][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), + on.next(220, 3), + on.next(230, 4), + on.next(240, 5), // + on.completed(250) + }); + + WHEN("element_at is taken"){ + + auto res = w.start( + [xs]() { + return xs.element_at(3); + } + ); + + THEN("the output contains the element at requested index"){ + auto required = rxu::to_vector({ + on.next(240, 5), + on.completed(240) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 240) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("element_at - invalid index", "[element_at][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + std::runtime_error ex("element_at on_error from source"); + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), // #0 + on.next(220, 3), // #1 + on.next(230, 4), // #2 + on.next(240, 5), // #3 + on.completed(250) + }); + + WHEN("element_at is taken"){ + + auto res = w.start( + [xs]() { + return xs.element_at(4); + } + ); + + THEN("the output contains an error"){ + auto required = rxu::to_vector({ + on.error(250, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} diff --git a/projects/CMake/CMakeLists.txt b/projects/CMake/CMakeLists.txt index e6f3d68..41d73ff 100644 --- a/projects/CMake/CMakeLists.txt +++ b/projects/CMake/CMakeLists.txt @@ -36,6 +36,7 @@ set(RX_SOURCES ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-connect_forever.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-distinct.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp + ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-element-at.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-filter.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-finally.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp |