summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Rx/v2/examples/doxygen/element_at.cpp14
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-element-at.hpp103
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp18
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp1
-rw-r--r--Rx/v2/test/CMakeLists.txt1
-rw-r--r--Rx/v2/test/operators/element_at.cpp292
-rw-r--r--projects/CMake/CMakeLists.txt1
7 files changed, 430 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/element_at.cpp b/Rx/v2/examples/doxygen/element_at.cpp
new file mode 100644
index 0000000..0b52776
--- /dev/null
+++ b/Rx/v2/examples/doxygen/element_at.cpp
@@ -0,0 +1,14 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("element_at sample"){
+ printf("//! [element_at sample]\n");
+ auto values = rxcpp::observable<>::range(1, 7).element_at(3);
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [element_at sample]\n");
+}
diff --git a/Rx/v2/src/rxcpp/operators/rx-element-at.hpp b/Rx/v2/src/rxcpp/operators/rx-element-at.hpp
new file mode 100644
index 0000000..a894598
--- /dev/null
+++ b/Rx/v2/src/rxcpp/operators/rx-element-at.hpp
@@ -0,0 +1,103 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+#if !defined(RXCPP_OPERATORS_RX_ELEMENT_AT_HPP)
+#define RXCPP_OPERATORS_RX_ELEMENT_AT_HPP
+
+#include "../rx-includes.hpp"
+
+namespace rxcpp {
+
+namespace operators {
+
+namespace detail {
+
+template<class T>
+struct element_at {
+ typedef rxu::decay_t<T> source_value_type;
+
+ struct element_at_values {
+ element_at_values(int i)
+ : index(i)
+ {
+ }
+ int index;
+ };
+
+ element_at_values initial;
+
+ element_at(int i)
+ : initial(i)
+ {
+ }
+
+ template<class Subscriber>
+ struct element_at_observer : public element_at_values
+ {
+ typedef element_at_observer<Subscriber> this_type;
+ typedef source_value_type value_type;
+ typedef rxu::decay_t<Subscriber> dest_type;
+ typedef observer<value_type, this_type> observer_type;
+ dest_type dest;
+ mutable int current;
+
+ element_at_observer(dest_type d, element_at_values v)
+ : element_at_values(v),
+ dest(d),
+ current(0)
+ {
+ }
+ void on_next(source_value_type v) const {
+ if (current++ == this->index) {
+ dest.on_next(v);
+ dest.on_completed();
+ }
+ }
+ void on_error(std::exception_ptr e) const {
+ dest.on_error(e);
+ }
+ void on_completed() const {
+ if(current <= this->index) {
+ dest.on_error(std::make_exception_ptr(std::range_error("index is out of bounds")));
+ }
+ }
+
+ static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, element_at_values v) {
+ return make_subscriber<value_type>(d, this_type(d, v));
+ }
+ };
+
+ template<class Subscriber>
+ auto operator()(Subscriber dest) const
+ -> decltype(element_at_observer<Subscriber>::make(std::move(dest), initial)) {
+ return element_at_observer<Subscriber>::make(std::move(dest), initial);
+ }
+};
+
+class element_at_factory
+{
+ int index;
+public:
+ element_at_factory(int i) : index(i) {}
+
+ template<class Observable>
+ auto operator()(Observable&& source)
+ -> decltype(source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(element_at<rxu::value_type_t<rxu::decay_t<Observable>>>(index))) {
+ return source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(element_at<rxu::value_type_t<rxu::decay_t<Observable>>>(index));
+ }
+};
+
+}
+
+
+inline auto element_at(int index)
+ -> detail::element_at_factory {
+ return detail::element_at_factory(index);
+}
+
+}
+
+}
+
+#endif
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 802b637..fe5f2d8 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -870,6 +870,24 @@ public:
return lift<T>(rxo::detail::distinct_until_changed<T>());
}
+ /*! Pulls an item located at a specified index location in the sequence of items and emits that item as its own sole emission.
+
+ \param index the index of the element to return.
+
+ \return An observable that emit an item located at a specified index location.
+
+ \sample
+ \snippet element_at.cpp element_at sample
+ \snippet output.txt element_at sample
+ */
+ auto element_at(int index) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::element_at<T>(index)))
+ /// \endcond
+ {
+ return lift<T>(rxo::detail::element_at<T>(index));
+ }
+
/*! Rerurn an observable that emits connected, non-overlapping windows, each containing at most count items from the source observable.
\param count the maximum size of each window before it should be completed
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 914b94a..689c8d1 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -44,6 +44,7 @@ namespace rxo=operators;
#include "operators/rx-connect_forever.hpp"
#include "operators/rx-distinct.hpp"
#include "operators/rx-distinct_until_changed.hpp"
+#include "operators/rx-element-at.hpp"
#include "operators/rx-filter.hpp"
#include "operators/rx-finally.hpp"
#include "operators/rx-flat_map.hpp"
diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt
index 3f08d74..9fd13fd 100644
--- a/Rx/v2/test/CMakeLists.txt
+++ b/Rx/v2/test/CMakeLists.txt
@@ -35,6 +35,7 @@ set(TEST_SOURCES
${TEST_DIR}/operators/concat_map.cpp
${TEST_DIR}/operators/distinct.cpp
${TEST_DIR}/operators/distinct_until_changed.cpp
+ ${TEST_DIR}/operators/element_at.cpp
${TEST_DIR}/operators/filter.cpp
${TEST_DIR}/operators/flat_map.cpp
${TEST_DIR}/operators/group_by.cpp
diff --git a/Rx/v2/test/operators/element_at.cpp b/Rx/v2/test/operators/element_at.cpp
new file mode 100644
index 0000000..06c3be1
--- /dev/null
+++ b/Rx/v2/test/operators/element_at.cpp
@@ -0,0 +1,292 @@
+#include "../test.h"
+
+SCENARIO("element_at - never", "[element_at][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1)
+ });
+
+ WHEN("element_at is taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs.element_at(3);
+ }
+ );
+
+ THEN("the output is empty"){
+ auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 1000)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("element_at - empty", "[element_at][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ std::runtime_error ex("element_at on_error from source");
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.completed(250)
+ });
+
+ WHEN("element_at is taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs.element_at(0);
+ }
+ );
+
+ THEN("the output only contains an error"){
+ auto required = rxu::to_vector({
+ on.error(250, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("element_at - first", "[element_at][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.completed(250)
+ });
+
+ WHEN("element_at is taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs.element_at(0);
+ }
+ );
+
+ THEN("the output contains the first element"){
+ auto required = rxu::to_vector({
+ on.next(210, 2),
+ on.completed(210)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 210)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("element_at - throw", "[element_at][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ std::runtime_error ex("element_at on_error from source");
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.error(250, ex)
+ });
+
+ WHEN("element_at is taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs.element_at(3);
+ }
+ );
+
+ THEN("the output contains an error"){
+ auto required = rxu::to_vector({
+ on.error(250, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("element_at - non-first", "[element_at][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(220, 3),
+ on.next(230, 4), //
+ on.next(240, 5),
+ on.completed(250)
+ });
+
+ WHEN("element_at is taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs.element_at(2);
+ }
+ );
+
+ THEN("the output contains the element at requested index"){
+ auto required = rxu::to_vector({
+ on.next(230, 4),
+ on.completed(230)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 230)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("element_at - last in a sequence", "[element_at][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(220, 3),
+ on.next(230, 4),
+ on.next(240, 5), //
+ on.completed(250)
+ });
+
+ WHEN("element_at is taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs.element_at(3);
+ }
+ );
+
+ THEN("the output contains the element at requested index"){
+ auto required = rxu::to_vector({
+ on.next(240, 5),
+ on.completed(240)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 240)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("element_at - invalid index", "[element_at][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ std::runtime_error ex("element_at on_error from source");
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2), // #0
+ on.next(220, 3), // #1
+ on.next(230, 4), // #2
+ on.next(240, 5), // #3
+ on.completed(250)
+ });
+
+ WHEN("element_at is taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs.element_at(4);
+ }
+ );
+
+ THEN("the output contains an error"){
+ auto required = rxu::to_vector({
+ on.error(250, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
diff --git a/projects/CMake/CMakeLists.txt b/projects/CMake/CMakeLists.txt
index e6f3d68..41d73ff 100644
--- a/projects/CMake/CMakeLists.txt
+++ b/projects/CMake/CMakeLists.txt
@@ -36,6 +36,7 @@ set(RX_SOURCES
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-connect_forever.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-distinct.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp
+ ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-element-at.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-filter.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-finally.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp